package com.pai4j.connect.websocket.handler;
import com.pai4j.common.enums.MessageTypeEnum;
import com.pai4j.common.enums.VideoSDPMessageType;
import com.pai4j.common.enums.messagequeue.MessageBroadChannelEnum;
import com.pai4j.common.service.messagequeue.producer.MessageQueueProducer;
import com.pai4j.common.util.JsonUtil;
import com.pai4j.connect.service.GroupVideoBizService;
import com.pai4j.domain.vo.request.mess.MessageNotifyVO;
import com.pai4j.domain.vo.request.mess.VideoChatMessageRequestVO;
import com.pai4j.domain.vo.request.mess.VideoUserChangeMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class GroupVideoWebSocketEndPointServletHandler extends TextWebSocketHandler {

    @Autowired
    private MessageQueueProducer<Object, Long> messageQueueProducer;

    @Autowired
    private GroupVideoBizService groupVideoBizService;

    /**
     * Map<KEY1, Map<KEY2, WebSocketSession>>
     *
     * KEY1：用于区分场景，如：PC-APP、 PC-CHAT、WAP-APP ，详见：WebSocketBizCodeEnum
     * KEY2：group id 群聊id，实现用户ws链接隔离（类似房间号）
     * KEY3: 用户id
     * WebSocketSession: 不同场景下用户的WebSocket Session对象 -- 全双工、有状态
     */
    public static Map<String, Map<String, Map<String, WebSocketSession>>> sessions = new ConcurrentHashMap<>();

    /**
     * 连接建立
     *
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 获取路径参数
        Map<String, Object> attributes = session.getAttributes();
        String bizCode = (String) attributes.get("bizCode");
        String groupId = (String) attributes.get("groupId");
        String userId = (String) attributes.get("userId");
        Map<String, Map<String, WebSocketSession>> groupUserSessions = sessions.get(bizCode);
        Map<String, WebSocketSession> userSessions;
        if (groupUserSessions == null) {
            // 服务重新启动，首次用户发起WS链接
            groupUserSessions = new ConcurrentHashMap<>();
            userSessions = new ConcurrentHashMap<>();
            groupUserSessions.put(groupId, userSessions);
            sessions.put(bizCode, groupUserSessions);
        } else {
            // 初始化处理首页群聊下多人音视频ws session结构
            userSessions = groupUserSessions.get(groupId);
        }
        userSessions.put(userId, session);
        log.info("GroupVideo WebSocket connection ====> groupId:{} userId:{}, sessionId:{}", groupId, userId, session.getId());
        // 记录当前在线用户到redis
        groupVideoBizService.online(groupId, userId);
        // 新用户加入群聊音视频，通知其他在线用户（创建已在线跟新上线用的webRTC链接）--- MESH
        this.doNotifyUserChangeEventForCurrOnlineUsers(groupId, userId, true);
    }


    /**
     * 断开连接
     *
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        System.out.println("GroupVideo WebSocket Connection closed: " + session.getId());
        Map<String, Object> attributes = session.getAttributes();
        String bizCode = (String) attributes.get("bizCode");
        String groupId = (String) attributes.get("groupId");
        String userId = (String) attributes.get("userId");
        // 剔除当前离线线用户
        groupVideoBizService.offline(groupId, userId);
        // 通知群聊音视频其他在线用户有人下线
        this.doNotifyUserChangeEventForCurrOnlineUsers(groupId, userId, false);
    }

    /**
     * 收到消息
     *
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String payload = message.getPayload();
        System.out.println("GroupVideo WebSocket Connection Received message: " + payload);
        if ("ping".equals(payload)) {
            // 心跳检测
            return;
        }
        VideoChatMessageRequestVO videoMessage = JsonUtil.fromJson(payload, VideoChatMessageRequestVO.class);
        String groupId = videoMessage.getGroupId();
        if (StringUtils.isBlank(groupId)) {
            return;
        }
        if (VideoSDPMessageType.call.name().equals(videoMessage.getType())) {
            /**
             * 1. 给被呼叫人推送呼叫提醒，广播音视频呼叫信息给被呼人
             */
            MessageNotifyVO messageBase = new MessageNotifyVO();
            // 这里角色反转，引入消息代理人机制（同理同群聊多人对话）
            messageBase.setProxySenderId(videoMessage.getUserId());
            messageBase.setSenderId(groupId);
            messageBase.setType(MessageTypeEnum.VIDEO_CALL.getType());
            messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_GROUP_VIDEO_CALL, messageBase);
            return;
        }
        /**
         * 2. 信命推送
         */
        messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_VIDEO_SDP, videoMessage);
    }

    /**
     * 通知当前群聊多人音视频在线用户：有新用户加入音视频群聊/离线
     */
    private void doNotifyUserChangeEventForCurrOnlineUsers(String groupId, String eventUser, boolean isOnline) {

        VideoUserChangeMessageVO message = new VideoUserChangeMessageVO();
        message.setEventUserId(eventUser);
        message.setGroupId(groupId);
        message.setIsOnline(isOnline);
        messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_GROUP_VIDEO_USER_CHANGE, message);
    }

    public static WebSocketSession getSession(String bizCode, String groupId, String userId) {
        if (StringUtils.isBlank(bizCode) || StringUtils.isBlank(userId)) {
            return null;
        }
        Map<String, Map<String, WebSocketSession>> groupUserSessions = sessions.get(bizCode);
        if (MapUtils.isEmpty(groupUserSessions)) {
            return null;
        }
        Map<String, WebSocketSession> userSessions = groupUserSessions.get(groupId);
        if (MapUtils.isEmpty(userSessions)) {
            return null;
        }
        return userSessions.get(userId);
    }

    public static WebSocketSession getSession(String groupId, String userId) {
        if (MapUtils.isEmpty(sessions)) {
            return null;
        }
        for (Map.Entry<String, Map<String, Map<String, WebSocketSession>>> session : sessions.entrySet()) {
            Map<String, Map<String, WebSocketSession>> groupUserSessions = session.getValue();
            Map<String, WebSocketSession> userSessions = groupUserSessions.get(groupId);
            WebSocketSession userSession = userSessions.get(userId);
            if (userSession != null) {
                return userSession;
            }
        }
        return null;
    }
}
